package zzu.xjc.http;

import zzu.xjc.http.util.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
 * @Author Xiejc
 * @Date 2021/8/6 11:30
 * @Description
 * @Since version-1.0
 */
public class ThreadWorker {
    private int index;
    private final static int BufSize = 1024;
    private Selector selector;
    private Thread thread;
    private HttpServiceExecutor executor;
    private BufferPool bufferPool;

    private final HashMap<SelectionKey,LinkedList<ByteBuffer>> bufferMap = new HashMap<>();

    void inject(HttpServiceExecutor executor,BufferPool bufferPool){
        this.executor = executor;
        this.bufferPool = bufferPool;
    }
    ThreadWorker(int index) throws IOException {
        this.index = index;
        selector = Selector.open();
    }
    void register(SocketChannel channel){
        try {
            channel.register(selector, SelectionKey.OP_READ);
            Logger.info("Worker-" + index + " Register Read Event, Address:" + Http.getChannelAddress(channel));
            selector.wakeup();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    void start(){
        thread = new Thread(this::work,"Worker-" + index);
        thread.start();
    }
    private void work(){
        Logger.info("Worker Start");
        try{
            while (true){
                int pass = selector.select();
                if (pass <= 0){
                    continue;
                }
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()){
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isReadable()){
                        handleRequest(key);
                    }else if (key.isWritable() && key.isValid()){
                        respond(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private void handleRequest(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        LinkedList<ByteBuffer> list;
        if (bufferMap.containsKey(key)) {
            list = bufferMap.get(key);
            if (list == null){
                Logger.warn("ThreadWorker -> handleRequest() -> list == null");
            }
        } else {
            /* First Read */
            list = new LinkedList<>();
            bufferMap.put(key,list);
        }
        assert list != null;
        Logger.info("Read and Execute from " + Http.getChannelAddress(channel));
        /* Read Data to Buffer 读取数据到Buffer */
        ByteBuffer buffer = null;
        int read = 0;
        do {
            buffer = bufferPool.allocate();
            read = channel.read(buffer);
            if (read > 0){
                list.push(buffer);
            }else {
                bufferPool.giveback(buffer);
            }
        }while (read > 0);
        bufferPool.log();
        if (read == 0){
//            Logger.info("ThreadWorker -> handleRequest() -> read == 0");
        }
        if (read == -1){
//            Logger.info("ThreadWorker -> handleRequest() -> read == -1");
            channel.close();
            key.cancel();
            bufferMap.remove(key);
            return;
        }
        list.forEach(ByteBuffer::flip);
//        printBufferList(list);list.forEach(ByteBuffer::rewind);

        /* Http Protocol Parse Http协议解析 */

        HttpRequest request = HttpProtocolParser.parse(list);
        bufferPool.giveback(list); // give back ByteBuffer List to BufferPool

        /* Service Callback Method Execute to Get Response 服务回调方法执行获取回复 */

        HttpResponse response = executor.execute(request);
        key.attach(response.toBuffer(bufferPool));
        bufferPool.log();

        /* Finish Phase 收尾 */
        bufferMap.remove(key);
        key.interestOps(SelectionKey.OP_WRITE);
        channel.shutdownInput();
        selector.wakeup();
    }

    private void respond(SelectionKey key) throws IOException {
        Logger.info("Write Event from " + Http.getChannelAddress((SocketChannel) key.channel()));
        SocketChannel channel = (SocketChannel) key.channel();

        LinkedList<ByteBuffer> bufferList = (LinkedList<ByteBuffer>) key.attachment();
        Iterator<ByteBuffer> iter = bufferList.iterator();
        do {
            ByteBuffer buffer = iter.next();
            int len;
            do{
                len = channel.write(buffer);
            }while (len > 0);

            if (buffer.hasRemaining()){
                if (len == 0) {
                /* buffer仍有剩余字节且 len == 0 表示通道不能再写入数据，可能是因为操作系统写入缓冲区已满或其他问题，所以应该稍后写入。
                   buffer.hasRemaining && len == 0 means channel can no longer write data ,
                   maybe because OS write buffer is full or other problem ,
                   so it should write later. */
                    key.interestOps(SelectionKey.OP_WRITE);
                    selector.wakeup();
                    return;
                }
                if (len == -1){
                    break;
                }
            }
            // 此时这个buffer中的所有字节都已经被读取了 Now All bytes in this buffer have been read
            iter.remove(); // 如果之后的循环重新 register Write Ops , remove 可以保证不会丢失循环的位置导致重复读取
            bufferPool.giveback(buffer);
            Logger.info("Buffer give back " + bufferPool.log());
        } while (iter.hasNext());

        /* 所有buffer的数据已经被完全写入  Data in all buffer were written fully */
        channel.close();
        key.cancel();
        selector.wakeup();
    }

    private void printBufferList(List<ByteBuffer> bufferList){
        for (ByteBuffer byteBuffer : bufferList) {
            System.out.println(StandardCharsets.US_ASCII.decode(byteBuffer).toString());
        }
    }
}
